Vector Autoregression (VAR) from scratch (NumPy)#

Goals#

  • Model multivariate time series jointly (not one series at a time)

  • Understand and visualize cross-dependencies between variables

  • Understand stationarity/stability in multivariate systems

  • Select lag order p with information criteria

  • Implement VAR estimation + impulse response functions (IRFs) with NumPy

  • Visualize variable interactions and IRFs with Plotly

import sys

import numpy as np
import pandas as pd
import plotly
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots

pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")

np.set_printoptions(precision=4, suppress=True)

SEED = 42
rng = np.random.default_rng(SEED)

print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Pandas:", pd.__version__)
print("Plotly:", plotly.__version__)
Python: 3.12.9
NumPy: 1.26.2
Pandas: 2.1.3
Plotly: 6.5.2

1) Multivariate time series modeling (what VAR is)#

A multivariate time series observes multiple variables at each time step.

  • Let the observation at time t be a vector:

\[\begin{split} \mathbf{y}_t = \begin{bmatrix} y_{1,t}\\ y_{2,t}\\ \vdots\\ y_{k,t}\end{bmatrix} \in \mathbb{R}^k \end{split}\]
  • A VAR(p) model explains current values using p past lags of all variables:

\[ \mathbf{y}_t = \mathbf{c} + \mathbf{A}_1\mathbf{y}_{t-1} + \mathbf{A}_2\mathbf{y}_{t-2} + \cdots + \mathbf{A}_p\mathbf{y}_{t-p} + \mathbf{u}_t \]

where:

  • \(\mathbf{c} \in \mathbb{R}^k\) is an intercept vector

  • \(\mathbf{A}_\ell \in \mathbb{R}^{k\times k}\) are coefficient matrices (one per lag)

  • \(\mathbf{u}_t \in \mathbb{R}^k\) is a zero-mean innovation (shock) with covariance \(\Sigma_u\).

2) Cross-dependencies between variables#

The key difference from fitting k separate AR models is that VAR includes cross-lag terms.

For k = 2 and p = 1:

\[\begin{split} \begin{bmatrix} x_t \\ y_t \end{bmatrix} = \begin{bmatrix} c_x \\ c_y \end{bmatrix} + \begin{bmatrix} a_{11} & a_{12} \\ a_{21} & a_{22} \end{bmatrix} \begin{bmatrix} x_{t-1} \\ y_{t-1} \end{bmatrix} + \begin{bmatrix} u_{x,t} \\ u_{y,t} \end{bmatrix} \end{split}\]
  • \(a_{12}\) is the effect of \(y_{t-1}\) on \(x_t\)

  • \(a_{21}\) is the effect of \(x_{t-1}\) on \(y_t\)

These off-diagonal terms let VAR capture feedback loops (e.g., policy ↔ economy, supply ↔ price, etc.).

3) Stationarity / stability in multivariate systems#

For univariate AR models, stationarity relates to roots of a characteristic polynomial. For VAR, the analogous concept is stability: shocks should not explode over time.

VAR(1)#

\[ \mathbf{y}_t = \mathbf{c} + \mathbf{A}_1\mathbf{y}_{t-1} + \mathbf{u}_t \]

A sufficient and (under mild assumptions) standard condition for stability is:

\[ \rho(\mathbf{A}_1) < 1 \]

where \(\rho(\cdot)\) is the spectral radius (largest absolute eigenvalue).

VAR(p) companion form#

Define the state vector:

\[\begin{split} \mathbf{s}_t = \begin{bmatrix} \mathbf{y}_t \\ \mathbf{y}_{t-1} \\ \vdots \\ \mathbf{y}_{t-p+1} \end{bmatrix} \in \mathbb{R}^{kp} \end{split}\]

and the companion matrix \(\mathbf{F} \in \mathbb{R}^{kp\times kp}\):

\[\begin{split} \mathbf{F}= \begin{bmatrix} \mathbf{A}_1 & \mathbf{A}_2 & \cdots & \mathbf{A}_{p-1} & \mathbf{A}_p \\ \mathbf{I}_k & \mathbf{0} & \cdots & \mathbf{0} & \mathbf{0} \\ \mathbf{0} & \mathbf{I}_k & \cdots & \mathbf{0} & \mathbf{0} \\ \vdots & & \ddots & & \vdots \\ \mathbf{0} & \mathbf{0} & \cdots & \mathbf{I}_k & \mathbf{0} \end{bmatrix} \end{split}\]

Then the system is stable/stationary if all eigenvalues of \(\mathbf{F}\) lie inside the unit circle.

def companion_matrix(A_list: list[np.ndarray]) -> np.ndarray:
    """Build the VAR(p) companion matrix F of shape (k*p, k*p)."""
    if len(A_list) == 0:
        raise ValueError("A_list must be non-empty")

    k = A_list[0].shape[0]
    p = len(A_list)
    if any(A.shape != (k, k) for A in A_list):
        raise ValueError("All A_i must have shape (k, k)")

    F = np.zeros((k * p, k * p), dtype=float)
    F[:k, : k * p] = np.concatenate(A_list, axis=1)
    if p > 1:
        F[k:, :-k] = np.eye(k * (p - 1))
    return F


def check_stationarity(A_list: list[np.ndarray]) -> dict:
    """Return eigenvalues and a stationarity flag based on the companion matrix."""
    F = companion_matrix(A_list)
    eigvals = np.linalg.eigvals(F)
    spectral_radius = float(np.max(np.abs(eigvals)))
    return {
        "is_stationary": spectral_radius < 1.0,
        "spectral_radius": spectral_radius,
        "eigvals": eigvals,
    }

4) Low-level NumPy estimation (OLS)#

With observations \(\{\mathbf{y}_t\}_{t=1}^T\) and lag order p, define the stacked matrices:

  • Dependent block: $$ \mathbf{Y} =

(1)#\[\begin{bmatrix} \mathbf{y}_{p+1}^\top \\ \mathbf{y}_{p+2}^\top \\ \vdots \\ \mathbf{y}_{T}^\top \end{bmatrix}\]

\in \mathbb{R}^{(T-p)\times k} $$

  • Design matrix (constant + lags): $$ \mathbf{X} =

(2)#\[\begin{bmatrix} 1 & \mathbf{y}_{p}^\top & \mathbf{y}_{p-1}^\top & \cdots & \mathbf{y}_{1}^\top \\ 1 & \mathbf{y}_{p+1}^\top & \mathbf{y}_{p}^\top & \cdots & \mathbf{y}_{2}^\top \\ \vdots & \vdots & \vdots & & \vdots \\ 1 & \mathbf{y}_{T-1}^\top & \mathbf{y}_{T-2}^\top & \cdots & \mathbf{y}_{T-p}^\top \end{bmatrix}\]

\in \mathbb{R}^{(T-p)\times (1+kp)} $$

Collect coefficients into: $\( \mathbf{B} = \begin{bmatrix} \mathbf{c} & \mathbf{A}_1 & \cdots & \mathbf{A}_p \end{bmatrix}^\top \in \mathbb{R}^{(1+kp)\times k} \)$

Then: $\( \mathbf{Y} = \mathbf{X}\mathbf{B} + \mathbf{U} \)$

and the OLS estimator is: $\( \hat{\mathbf{B}} = (\mathbf{X}^\top\mathbf{X})^{-1}\mathbf{X}^\top\mathbf{Y} \)$

We’ll compute it with np.linalg.lstsq (numerically safer than forming the inverse explicitly).

def build_var_matrices(y: np.ndarray, p: int, include_const: bool = True) -> tuple[np.ndarray, np.ndarray]:
    """Construct (X, Y) for VAR(p) with rows as time and columns as variables.

    y: (T, k)
    X: (T-p, (1 if include_const else 0) + k*p)
    Y: (T-p, k)
    """
    y = np.asarray(y, dtype=float)
    if y.ndim != 2:
        raise ValueError("y must have shape (T, k)")
    if p < 1:
        raise ValueError("p must be >= 1")

    n_steps, k = y.shape
    if n_steps <= p:
        raise ValueError("Need more observations than p")

    Y = y[p:, :]
    X_parts = []
    if include_const:
        X_parts.append(np.ones((n_steps - p, 1)))
    for lag in range(1, p + 1):
        X_parts.append(y[p - lag : n_steps - lag, :])
    X = np.concatenate(X_parts, axis=1)
    return X, Y


def fit_var_ols(y: np.ndarray, p: int, include_const: bool = True) -> dict:
    """Fit VAR(p) with equation-by-equation OLS (equivalently multivariate least squares)."""
    X, Y = build_var_matrices(y=y, p=p, include_const=include_const)
    beta_hat, *_ = np.linalg.lstsq(X, Y, rcond=None)  # (n_features, k)

    Y_hat = X @ beta_hat
    residuals = Y - Y_hat

    effective_T = Y.shape[0]
    Sigma_u = (residuals.T @ residuals) / effective_T

    k = Y.shape[1]
    start = 0
    if include_const:
        intercept = beta_hat[0, :]
        start = 1
    else:
        intercept = np.zeros(k)

    A_list = []
    for lag_index in range(p):
        block = beta_hat[start + lag_index * k : start + (lag_index + 1) * k, :]  # (k, k)
        A_list.append(block.T)

    return {
        "p": p,
        "k": k,
        "include_const": include_const,
        "beta": beta_hat,
        "intercept": intercept,
        "A": A_list,
        "residuals": residuals,
        "Sigma_u": Sigma_u,
        "X": X,
        "Y": Y,
    }

5) Lag selection (choosing p)#

Increasing p makes the model more flexible, but it increases parameters quickly:

  • With k variables, each lag adds k×k coefficients.

  • With p lags, coefficients are k^2 p (plus k intercepts).

A common approach is to fit VAR models for p = 1..p_max and pick the lag that minimizes an information criterion.

Using the residual covariance estimate \(\hat{\Sigma}_u(p)\) and parameter count \(m(p) = k^2 p + k\) (if intercept included):

\[ \mathrm{AIC}(p) = \log\det\hat{\Sigma}_u(p) + \frac{2m(p)}{T_p} \]
\[ \mathrm{BIC}(p) = \log\det\hat{\Sigma}_u(p) + \frac{\log(T_p)\,m(p)}{T_p} \]

where \(T_p = T - p\) is the number of usable rows after lagging.

def _slogdet_psd(matrix: np.ndarray, jitter: float = 1e-10, max_tries: int = 6) -> float:
    """Stable log(det(.)) for covariance-like matrices, with diagonal jitter if needed."""
    matrix = np.asarray(matrix, dtype=float)
    if matrix.shape[0] != matrix.shape[1]:
        raise ValueError("matrix must be square")

    jitter_value = float(jitter)
    for _ in range(max_tries):
        sign, logdet = np.linalg.slogdet(matrix)
        if sign > 0:
            return float(logdet)
        matrix = matrix + jitter_value * np.eye(matrix.shape[0])
        jitter_value *= 10.0

    raise np.linalg.LinAlgError("slogdet failed: matrix not positive definite after jitter")


def information_criteria_var(y: np.ndarray, p: int, include_const: bool = True) -> dict:
    model = fit_var_ols(y=y, p=p, include_const=include_const)
    k = model["k"]
    T_p = model["Y"].shape[0]

    n_params = (k * k * p) + (k if include_const else 0)
    logdet = _slogdet_psd(model["Sigma_u"])

    aic = logdet + 2.0 * n_params / T_p
    bic = logdet + np.log(T_p) * n_params / T_p
    return {
        "p": p,
        "aic": float(aic),
        "bic": float(bic),
        "n_params": int(n_params),
        "T_p": int(T_p),
    }


def select_lag(y: np.ndarray, p_max: int, include_const: bool = True) -> tuple[pd.DataFrame, dict]:
    records = [information_criteria_var(y=y, p=p, include_const=include_const) for p in range(1, p_max + 1)]
    table = pd.DataFrame.from_records(records).set_index("p").sort_index()
    best = {
        "aic": int(table["aic"].idxmin()),
        "bic": int(table["bic"].idxmin()),
    }
    return table, best

6) Intuition: an interacting system (simulate a stable VAR)#

Think of a VAR as a simple discrete-time dynamical system:

  • Each variable responds to its own past (inertia / momentum)

  • Each variable can respond to other variables’ past (coupling / interaction)

We’ll simulate a 3-variable VAR(2) with cross-effects, then:

  1. choose a lag order

  2. fit the coefficients

  3. visualize interactions and IRFs.

def cholesky_with_jitter(matrix: np.ndarray, jitter: float = 1e-10, max_tries: int = 6) -> np.ndarray:
    matrix = np.asarray(matrix, dtype=float)
    jitter_value = float(jitter)
    for _ in range(max_tries):
        try:
            return np.linalg.cholesky(matrix)
        except np.linalg.LinAlgError:
            matrix = matrix + jitter_value * np.eye(matrix.shape[0])
            jitter_value *= 10.0
    raise


def simulate_var(
    A_list: list[np.ndarray],
    Sigma_u: np.ndarray,
    n_steps: int,
    *,
    intercept: np.ndarray | None = None,
    burn_in: int = 200,
    rng: np.random.Generator | None = None,
) -> np.ndarray:
    """Simulate y_t = c + sum_i A_i y_{t-i} + u_t.

    Returns y with shape (n_steps, k).
    """
    if rng is None:
        rng = np.random.default_rng(0)

    k = A_list[0].shape[0]
    p = len(A_list)
    if intercept is None:
        intercept = np.zeros(k)

    chol = cholesky_with_jitter(Sigma_u)

    y = np.zeros((n_steps + burn_in + p, k), dtype=float)
    for t in range(p, y.shape[0]):
        ar_part = np.zeros(k)
        for lag_index, A in enumerate(A_list, start=1):
            ar_part = ar_part + A @ y[t - lag_index]
        shock = chol @ rng.standard_normal(k)
        y[t] = intercept + ar_part + shock

    return y[burn_in + p :, :]


names = ["Output", "Inflation", "PolicyRate"]

# A stable VAR(2) with cross-dependencies (off-diagonals)
A1_true = np.array(
    [
        [0.55, 0.10, -0.15],
        [0.15, 0.45, 0.05],
        [-0.20, 0.25, 0.40],
    ]
)
A2_true = np.array(
    [
        [-0.15, 0.00, 0.00],
        [0.00, -0.10, 0.00],
        [0.00, 0.00, -0.10],
    ]
)

Sigma_u_true = np.array(
    [
        [1.00, 0.30, 0.15],
        [0.30, 1.00, 0.10],
        [0.15, 0.10, 1.00],
    ]
)

stationarity_true = check_stationarity([A1_true, A2_true])
stationarity_true
{'is_stationary': True,
 'spectral_radius': 0.4152628017403557,
 'eigvals': array([0.0926+0.3248j, 0.0926-0.3248j, 0.3683+0.1919j, 0.3683-0.1919j,
        0.2392+0.1381j, 0.2392-0.1381j])}
y = simulate_var([A1_true, A2_true], Sigma_u=Sigma_u_true, n_steps=800, rng=rng)
df = pd.DataFrame(y, columns=names)
df.head()
Output Inflation PolicyRate
0 1.196020 -0.288455 0.672731
1 -0.200209 -0.739549 0.524796
2 0.320734 0.348595 -1.525616
3 1.008172 -0.622605 -0.386434
4 -0.921646 -0.184484 -1.343241
fig = go.Figure()
for name in names:
    fig.add_trace(go.Scatter(x=df.index, y=df[name], mode="lines", name=name))

fig.update_layout(
    title="Simulated interacting system (VAR(2))",
    xaxis_title="t",
    yaxis_title="value",
    legend_title="variable",
    height=350,
)
fig.show()
ic_table, best = select_lag(df.values, p_max=6, include_const=True)
ic_table
aic bic n_params T_p
p
1 0.059707 0.130046 12 799
2 -0.004962 0.118251 21 798
3 0.005384 0.181576 30 797
4 0.008751 0.238028 39 796
5 0.017841 0.300307 48 795
6 0.039101 0.374861 57 794
best
{'aic': 2, 'bic': 2}
p_hat = best["bic"]
model = fit_var_ols(df.values, p=p_hat, include_const=True)

stationarity_hat = check_stationarity(model["A"])
{k: stationarity_hat[k] for k in ["is_stationary", "spectral_radius"]}
{'is_stationary': True, 'spectral_radius': 0.4896672335687779}
def plot_var_coefficients(A_list: list[np.ndarray], names: list[str]) -> go.Figure:
    k = A_list[0].shape[0]
    p = len(A_list)
    zmax = max(float(np.max(np.abs(A))) for A in A_list)

    fig = make_subplots(
        rows=1,
        cols=p,
        shared_yaxes=True,
        subplot_titles=[f"Lag {i}" for i in range(1, p + 1)],
    )

    for col, A in enumerate(A_list, start=1):
        fig.add_trace(
            go.Heatmap(
                z=A,
                x=names,
                y=names,
                colorscale="RdBu",
                zmin=-zmax,
                zmax=zmax,
                showscale=col == 1,
                colorbar=dict(title="coef"),
            ),
            row=1,
            col=col,
        )

    fig.update_layout(
        title="Estimated variable interactions (A matrices)",
        height=360,
        width=320 * p,
    )
    fig.update_xaxes(title_text="Source (lagged variable)")
    fig.update_yaxes(title_text="Target (current variable)")
    return fig


plot_var_coefficients(model["A"], names).show()

7) Impulse response functions (IRFs)#

IRFs answer: “If I shock variable j today, how do variables respond over future horizons?”

A VAR has a moving-average representation:

\[ \mathbf{y}_t = \mu + \sum_{h=0}^{\infty} \Psi_h\,\mathbf{u}_{t-h} \]

where each \(\Psi_h \in \mathbb{R}^{k\times k}\) maps a shock at horizon h to responses.

  • For stable VARs, \(\Psi_h \to 0\) as \(h \to \infty\).

  • If shocks are correlated (\(\Sigma_u\) not diagonal), IRFs are often orthogonalized via a Cholesky factorization of \(\Sigma_u\). This introduces an ordering assumption (the variable order matters).

def irf_matrices(A_list: list[np.ndarray], horizons: int) -> np.ndarray:
    """Return Psi_h for h=0..H with shape (H+1, k, k)."""
    k = A_list[0].shape[0]
    p = len(A_list)
    F = companion_matrix(A_list)

    Psi = np.empty((horizons + 1, k, k), dtype=float)
    F_power = np.eye(k * p)
    for h in range(horizons + 1):
        Psi[h] = F_power[:k, :k]
        F_power = F_power @ F
    return Psi


def orthogonalize_irf(Psi: np.ndarray, Sigma_u: np.ndarray) -> np.ndarray:
    chol = cholesky_with_jitter(Sigma_u)
    return Psi @ chol


def plot_irf_grid(Psi: np.ndarray, names: list[str], *, title: str) -> go.Figure:
    horizons = Psi.shape[0] - 1
    k = Psi.shape[1]
    h = np.arange(horizons + 1)

    subplot_titles = [f"{names[i]}{names[j]}" for i in range(k) for j in range(k)]
    fig = make_subplots(rows=k, cols=k, shared_xaxes=True, subplot_titles=subplot_titles)

    for i in range(k):
        for j in range(k):
            fig.add_trace(
                go.Scatter(
                    x=h,
                    y=Psi[:, i, j],
                    mode="lines",
                    line=dict(width=2),
                    showlegend=False,
                ),
                row=i + 1,
                col=j + 1,
            )
            fig.add_trace(
                go.Scatter(
                    x=h,
                    y=np.zeros_like(h),
                    mode="lines",
                    line=dict(color="black", width=1, dash="dot"),
                    showlegend=False,
                ),
                row=i + 1,
                col=j + 1,
            )

    fig.update_layout(title=title, height=240 * k, width=240 * k)
    for row in range(1, k + 1):
        fig.update_yaxes(title_text="response", row=row, col=1)
    for col in range(1, k + 1):
        fig.update_xaxes(title_text="horizon", row=k, col=col)
    return fig


H = 20
Psi = irf_matrices(model["A"], horizons=H)
Psi_orth = orthogonalize_irf(Psi, Sigma_u=model["Sigma_u"])

plot_irf_grid(Psi_orth, names=names, title="Orthogonalized IRFs (Cholesky, order = column order)").show()

Takeaways#

  • VAR models multiple time series jointly and captures cross-dependencies via off-diagonal coefficients in \(\mathbf{A}_\ell\).

  • Multivariate stationarity is a stability condition: eigenvalues of the companion matrix must be inside the unit circle.

  • Lag selection trades bias/variance; AIC often prefers larger models, BIC is more conservative.

  • IRFs make dynamics interpretable; orthogonalization requires an identification assumption (ordering matters).